<?php
declare(strict_types=1);

namespace App\Master\Framework\Library\Mqtt;

use function Hyperf\Coroutine\co;

class Subscribe{
    /**
     * MQTT 订阅
     * @param array $topic
     * @param $function
     * @return mixed
     */
    public function endlessLoop(array $topic,$function): mixed
    {
        $client = new MqttClient();
        // 订阅一个主题或者多个主题
        $client->subscribe($topic);

        //时间
        $timeSincePing = time();

        while (true){
            // 接收并处理消息
            $message = $client->recv();// 订阅消息
            if ($message && $message !== true) {
                // 携程处理
                co(function () use ($function,$message){
                    $function($message);
                });
            }

            //心跳
            if ($timeSincePing <= (time() - $client->getKeepAlive())){
                if ($client->ping()) {
                    $timeSincePing = time();
                }
            }
        }
    }
}